package com.shujia.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class Demo03FileSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         * Flink新版读文件的方式
         */
        FileSource<String> fileSource = FileSource
                .forRecordStreamFormat(
                        new TextLineInputFormat()
                        , new Path("flink/data/words/")
                )
                // 可以指定一个间隔时间监控目录下的文件变化，最终会将读文件从有界流变成无界流
                .monitorContinuously(Duration.ofSeconds(5))
                .build();

        /*
         * fromSource：添加Flink自带的Source
         * addSource：添加自定义或者是第三方的Source
         */
        DataStreamSource<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
        fileDS.print();

        env.execute();


    }
}
